package org.luosl.webmagicx

import java.util

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.model.{ContentTypes, HttpEntity}
import akka.http.scaladsl.server.Directives.{complete, get, path, _}
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import org.luosl.webmagicx.conf.SpiderConf
import org.luosl.webmagicx.listeners.{GeneralPipelineListener, GeneralProcessorListener, GeneralSpiderListener}
import org.luosl.webmagicx.pipeline.BasePipeline
import org.luosl.webmagicx.processor.BaseProcessor
import play.api.libs.json.{JsObject, Json}
import us.codecraft.webmagic.Spider.Status
import us.codecraft.webmagic.{Spider, SpiderListener}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.{ExecutionContextExecutor, Future}

/**
  * 基于http的爬虫监听
  * Created by luosl on 2017/12/13.
  */
class HttpSpiderMonitor(host:String, port:Int) {

  private val registerSpiders =
    new util.HashMap[String,(Spider,SpiderConf,GeneralSpiderListener,GeneralProcessorListener,List[GeneralPipelineListener])]()

  implicit val system:ActorSystem = {
    ActorSystem("webmagics-system")
  }
  implicit val materializer:ActorMaterializer = ActorMaterializer()
  implicit val executionContext:ExecutionContextExecutor = system.dispatcher

  val route:Route =
    path("spider"/Remaining){ sid =>
      val jsonResult = if(registerSpiders.containsKey(sid)){
        spiderInfo(sid)
      }else{
        Json.obj("code"->"500", "msg"->s"无效的spiderId:$sid").toString()
      }
      complete(HttpEntity(ContentTypes.`application/json`, jsonResult.toString))
  } ~ path("index"){
    complete(HttpEntity(ContentTypes.`application/json`, overview().toString()))
  }


  val bindingFuture:Future[ServerBinding] = Http().bindAndHandle(route, host, port)

  /**
    * 注册监听
    * @param spider spider
    */
  def register(spider:Spider, sc:SpiderConf): Unit ={

    // 添加 SpiderListener
    val listener:GeneralSpiderListener = {
      val ls:GeneralSpiderListener = new GeneralSpiderListener(spider,sc)
      spider.setSpiderListeners(List(ls.asInstanceOf[SpiderListener]).asJava)
      ls
    }

    // 添加 ProcessorListener
    val processorListener:GeneralProcessorListener = {
      val processor: BaseProcessor = spider.getPageProcessor.asInstanceOf[BaseProcessor]
      val pls = new GeneralProcessorListener(processor.getClass)
      processor.addListener(pls)
      pls
    }

    // 添加 ProcessorListener
    val pipelines:List[BasePipeline] = spider.getPipelines.asScala
      .filter(_.isInstanceOf[BasePipeline])
      .map(_.asInstanceOf[BasePipeline]).toList
    val pipelineListeners:List[GeneralPipelineListener] = pipelines.map{ pipeline =>
      val pipelineListener = new GeneralPipelineListener(pipeline.getClass)
      pipeline.addListener(pipelineListener)
      pipelineListener
    }
    registerSpiders.put(sc.id, (spider,sc,listener,processorListener,pipelineListeners))
  }

  def shutdown(): Unit ={
    materializer.shutdown()
    system.terminate()
  }

  /**
    * 获取所有爬虫任务的概览
    * @return
    */
  private def overview(): JsObject ={
    val spiders:mutable.Iterable[JsObject] = registerSpiders.asScala.map{ tu=>
      val (spider, spiderConfig, _, _, _) = registerSpiders.get(tu._1)
      Json.obj(
        "id" -> spiderConfig.id,
        "desc" -> spiderConfig.desc,
        "status" -> spider.getStatus.toString,
        "taskType" -> spiderConfig.taskType,
        "configPath" -> spiderConfig.path
      )
    }
    Json.obj(
      "allSpiderCount" -> registerSpiders.asScala.size,
      "runningCount" -> registerSpiders.asScala.count(_._2._1.getStatus == Status.Running),
      "spiders" -> spiders
    )
  }

  private def spiderInfo(sid:String): JsObject ={
    val (_, _, listener,proListener,pipelineListeners) = registerSpiders.get(sid)
    val spiderMsg:JsObject = listener.createJsonMsg()
    val processorMsg:JsObject = Json.obj("processor"-> proListener.createJsonMsg())
    val pipelineMsgs:JsObject = Json.obj("pipelines"-> Json.arr(pipelineListeners.map(_.createJsonMsg())))
    spiderMsg ++ processorMsg ++ pipelineMsgs
  }

}
